Hi! Can someone explain to me why this code seems ...
# coroutines
g
Hi! Can someone explain to me why this code seems to work for a
concurrency
64 but reaches a dead lock for = 64??? (on my MacBook Pro)
Copy code
private val consumingScope = CoroutineScope(Dispatchers.IO)
private val producingScope = CoroutineScope(SupervisorJob() + Dispatchers.IO)
private var counter = 0

fun main(): Unit = startConsumer(concurrency = 64)

private fun processMessage(
  pulsarMessage: String,
  workerIndex: Int
) = producingScope.future {
  println("Worker $workerIndex: start($pulsarMessage)")
  delay(Random.nextLong(1000))
  println("Worker $workerIndex: end  ($pulsarMessage)")
}.join()

private fun startConsumer(concurrency: Int) = consumingScope.future {
  val channel = Channel<String>()

  // start processing coroutines
  repeat(concurrency) {
    launch {
      for (message in channel) {
        processMessage(message, it)
      }
    }
  }
  // loop sending messages to the channel
  while (isActive) {
    counter++
    channel.send("$counter")
  }
}.join()
(note that the dead lock disappears when I use different Dispatchers: Dispatchers.Default and Dispatchers.IO. The time to execute is also very different: the quicker by large is with consumingScope with Dispatchers.IO and producingScope with Dispatchers.Default)
s
In
processMessage
, you're calling
CompletableFuture.join()
. This will block the thread until the future is complete. The future is running in the
producingScope
, so it needs a thread from the IO dispatcher in order to complete its work. But
processMessage
, called from the
consumingScope
, is also running on the IO dispatcher. So the thread it blocks comes from the same thread pool that the future is trying to run on.
With high enough concurrency, that will lead to a deadlock where the threads are all blocked waiting for futures that can never complete.
I'm guessing the code you shared is a simplified example. But in general, if you want a function that runs some asynchronous work and waits for it to finish, it would be more normal to just make it a suspending function.
1
g
@Sam thanks - let's me digest that (the asynchronous work is done by some java code out of my control unfortunately)
Something unclear (to me) was that consumerScope and producingScope were actually sharing the same set of threads.
I thought that a new set of threads would be created with the scope. I understand now that this set of threads is embedded within the dispatcher.
s
Each dispatcher is responsible for its own threads. If you share the same dispatcher between two scopes, they will share the same threads. To have each scope use different threads, you would need to use two different dispatchers. It's unlikely that would be the right approach, though. The whole point of coroutines is to avoid creating new threads for each task, after all.
What's your motivation for using Dispatchers.IO here?
g
As you may guess, when non-simplified, this code pulls messages from a streaming system and process it. I have multiple coroutines that pulls messages on multiple topics. It seemed natural to use Dispatchers.IO for the io intensive work of pulling messages. When processing the message, I run some java code out of my control (I'm building a framework) but also I have to send responses messages to more topics - that's why I was tempted to use Dispatchers.io for both scopes.
s
Okay, a somewhat more fundamental question then—what's your motivation for using coroutines? If both producer and consumer are using blocking Java code, it's possible that putting coroutines in the middle might not offer any benefits.
g
That's a good and fair question. I was wondering myself. For consumers, everything is under my control, and I can benefit of coroutines. For producers, I may have to send more than one message for each
processMessage
and was using coroutines for parallel processing. Also I was under the impression that coroutines was giving me a better framework for error management.
s
So the body of the
processMessage
, which currently contains the random delay, may eventually contain multiple coroutines that run concurrently?
g
Yes
s
I think a general principle that would help you here is to have each operation specify for itself which dispatcher it needs to run on. For example:
Copy code
suspend fun pullMessages() = withContext(<http://Dispatchers.IO|Dispatchers.IO>) {
    // run some blocking code to pull messages
}

suspend fun runSomeJavaCode() = withContext(myCustomThreadPoolDispatcher) {
    // run some user-supplied Java code
}
That way, you can write your framework code using suspending functions on the default dispatcher.
g
Thank you @Sam - that’s helpful. I feel I only start really understanding coroutines :)